-
Notifications
You must be signed in to change notification settings - Fork 989
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add the ability to create IndexPartition based on the desired number of documents per split #812
Conversation
@costin @jbaiera, this PR introduces the SlicePartitionDefinition. I had to refactor (cleanup) some parts to make it work. This is still a work in progress and I need to add more tests. In the meanwhile it would be very helpful if you can check the progress. Considering the size of the PR I am available at your convenience for a deep dive. |
e1ca9af
to
fcd5fdb
Compare
@@ -410,7 +408,7 @@ public void refresh(Resource resource) { | |||
sb.setLength(sb.length() - 1); | |||
sb.append("\n]}"); | |||
|
|||
if (!isES20) { | |||
if (esVersion().equals(EsVersion.V_2_X)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be !esVersion().equals(EsVersion.V_2_X)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, This should just be if the version is equal to V_1_X
. This line is balancing the curly braces.
This is a very large PR. I'll be leaving comments as I find items. |
|
||
return version.startsWith("2."); | ||
} | ||
|
||
/** | ||
* Whether the settings indicate a ES 5.0.x (which introduces breaking changes) or otherwise. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you going to remove this settings check as well in favor of the new route? I'm assuming this is an item that is still in progress...
Jim, thanks again for starting to work on this. A couple of things from the top of my head: A. Regarding the supported ES versions - currently ES-Hadoop supports 1.x, 2.x and master adds support for 5.x. Going forward, remains to be decided whether support for 1.x will be kept - either way I would not bother with 0.X B. Also things like mapping are resolved eagerly to avoid having each client/task do that - basically for N tasks, currently there's only one mapping fetch (which also does some validation); if it's lazy it translates to N mapping calls. Fwiw, I favor keeping the new slice feature in a separate package and try to keep the old code around to make the transition and code porting between 2.x and 5.x branches easy. |
dbd09ce
to
2000457
Compare
Thanks @costin and @jbaiera for the first round of review.
I splitted the big commit in smaller pieces. I've intentionally kept the error spotted by your review in the original commits. The commits that address your comments are last. I hope this can help the review process.
I don’t see it that way, IMO keeping the mapping (and the config) in each PartitionDefinition is worst than having to retrieve it lazily when the task is initialized. The mapping (and the config) can be big and it's even worst with the base64 conversion, furthermore if the mapping is resolved when the partitions are created there is a high probability that a mapping update happens between the initialization of the job and the execution of the task.
There should be no code porting between 2.x and 5.x. There is no breaking change for the available clients in the repo. The changes are internal, the only thing that client should change is the new configuration option to set the expected number of documents per task. |
Map<String, String> version = get("", "version"); | ||
if (version == null || !StringUtils.hasText(version.get("number"))) { | ||
return "Unknown"; | ||
public synchronized EsVersion esVersion() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be reverted back to being retrieved from the configuration at construction time. This method is called many times in the client and the synchronized nature of it is not great for performance. There really shouldn't be a scenario where the value of the version will be different between tasks, so it's reasonable for the version information to be stored once for the entire job.
Right, we discussed this with @costin and he said approximatively the same so I removed it. |
@jbaiera about the Elasticsearch version discovery in the RestClient I pushed f101370 This change retrieves an internal version from the settings and use it to build the requests. |
Re: Data Locality - If we're all +1 on removing support for the locality features then the docs should probably be updated to remove the mentions to data locality as well. |
This commits changes how we split the query in multiple partitions. For cluster running with version prior to v5.x: * We create one partition for each shard of each requested index. * If an alias is requested the search routing and the alias filter are respected. * The partition is no longer attached to a node nor an ip. Only the shardId and index name are defined in order to be able to use any replica in the cluster when the partition is consumed. This makes the retry possible if a node disapears during a job. * The ability to consume a partition on the node that is responsible for the index/shardId has been removed temporarily and should be re-added in a follow up. For cluster ruuning with version v5.x: * We first split by index then by shard and finally by the maximum number of documents allowed per partition (configurable through the new option named es.input.maxdocsperpartition. For instance an index with 5 shards, 1M documents and a maximum number of documents allowed per partition equals to 100,000, a match all query would be splitted in 50 partitions, 10 partitions per shard. * If an alias is requested the search routing and the alias filter are respected. Fixes elastic#778
@jbaiera @costin I've updated the description of the PR with the latest status. |
public RawQueryBuilder(Map<String, Object> map) throws IOException { | ||
Object query = map; | ||
if (map.containsKey("query")) { | ||
query = map.remove("query"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line right here breaks backwards compatibility with ES v1.7.3 for SparkSQL. We have code that hooks into Spark that allows us to make sense of some of the SQL operations being executed and translate those operations into pushdown filters and queries.
In the case of running against ES 1.7.3, the framework will detect the version and will create a query context with the pushdown query underneath it to allow compatibility with the filter context. Because all the filters are passed through the parsing code listed above before building the final query, the query context marker is removed from the JSON and the query ceases to compile from within the filter context.
Basically, the filter fragment goes from being this:
{ "query": { "match" : { "..." : "..." } } }
to this:
{ "match": { "..." : "..." } }
which is then inserted into a filter context and subsequently comes back from the server with the following error:
org.elasticsearch.search.SearchParseException: [defaultstrictspark-test][2]: from[-1],size[-1]: Parse Failure [Failed to parse source [{"query":{"filtered":{"query":{"match_all":{}},"filter":{"match":{"airport":"OTP"}}}}}]]
at org.elasticsearch.search.SearchService.parseSource(SearchService.java:747)
<...>
Caused by: org.elasticsearch.index.query.QueryParsingException: [defaultstrictspark-test] No filter registered for [match]
This probably isn't a huge blocker for the alpha release, but it is a big deal if we want to continue the support for 1.x.
Local integration tests are looking good for 5.x and 2.x Elasticsearch versions. Ran into some snags with 1.x surrounding SparkSQL that I have commented on above. |
For elasticsearch versions prior to 5.x the raw query builder should be able to parse a query that wraps a filter. Since we use the RawQueryBuilder to parse root queries as well we need to differenciate the parsing of a query and the parsing of a filter. For the former we need to remove the root "query" object if it exists and for the latter we need to keep it because it only means that the filter is wrapped in a query. This change also adds javadoc for the query builders.
Good catch @jbaiera. I pushed a fix to handle 1.x query DSL. Regarding the data locality I don't think that it works as expected in the current version (prior to this PR). The preference API is used to restrict the query to the node where the targeted shard is present but the rest client will use a random node to send the query which ultimately will be redirected to the targeted node. IMO this extra round trip makes the data locality useless. The task can run on the same machine than the node which hosts the shard but since we use a random node to do the coordination there is no advantage of doing this. |
Just discoverer that SettingsUtils.pinNode is supposed to restrict the client to a node. This means that my last comment about data locality is wrong ;). |
Handle the new format for NodeInfo with elasticsearch 5.x: * http_address has been removed * the list of roles is declared in a dedicated section named 'roles' (instead of parameters) This change fixes the discovery of the nodes when elasticsearch 5.x is used. It also fixes the client/data only mode where only allowed nodes are requested.
I pushed a fix for the NodeInfo parsing bug: This fixes the discovery and the selection of nodes based on some criteria (isClientOnly, isDataOnly, ...). Please note that the bug is not due to this PR, the reason is that the format of the node info changed in 5.x. |
re: Node Info - LGTM |
This pretty big PR (sorry for the size) changes how the splits are created from an elasticsearch query.
For cluster running with version prior to v5.x:
For cluster ruuning with version v5.x:
Fixes #778